{
  "cells": [
    {
      "cell_type": "markdown",
      "id": "9f6562f1",
      "metadata": {
        "id": "9f6562f1"
      },
      "source": [
        "# Building Pipelines\n",
        "\n",
        "<a href=\"https://colab.research.google.com/github/lwa-project/bifrost/blob/master/tutorial/04_pipelines.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"></a>\n",
        "\n",
        "We now have all of the pieces needed to build a complete pipeline in the Bifrost framework.  Although it is hard to run a full multi-threaded Bifrost pipeline inside a Jupyter notebook we will look at a few examples."
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "%%capture install_log\n",
        "# Import bifrost, but attempt to auto-install if needed (and we're running on\n",
        "# Colab). If something goes wrong, evaluate install_log.show() in a new block\n",
        "# to retrieve the details.\n",
        "try:\n",
        "  import bifrost\n",
        "except ModuleNotFoundError as exn:\n",
        "  try:\n",
        "    import google.colab\n",
        "  except ModuleNotFoundError:\n",
        "    raise exn\n",
        "  !sudo apt-get -qq install universal-ctags libopenblas-dev software-properties-common build-essential\n",
        "  !pip install -q contextlib2 pint simplejson scipy git+https://github.com/ctypesgen/ctypesgen.git\n",
        "  ![ -d ~/bifrost/.git ] || git clone https://github.com/lwa-project/bifrost ~/bifrost\n",
        "  !(cd ~/bifrost && ./configure --disable-cuda && make -j all && sudo make install)\n",
        "  import bifrost"
      ],
      "metadata": {
        "id": "DQLLqB44cSUy"
      },
      "id": "DQLLqB44cSUy",
      "execution_count": 1,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "Let's start with a simple pipeline that generates random data in one block and writes it to disk in another.  The generator block looks like:"
      ],
      "metadata": {
        "id": "vzyE5ZgOcBwE"
      },
      "id": "vzyE5ZgOcBwE"
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "id": "423032b3",
      "metadata": {
        "id": "423032b3"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "import json\n",
        "import time\n",
        "import numpy\n",
        "import threading\n",
        "\n",
        "class GeneratorOp(object):\n",
        "    def __init__(self, oring, ntime_gulp=250, \n",
        "                 shutdown_event=None, core=None):\n",
        "        self.oring   = oring\n",
        "        self.ntime_gulp   = ntime_gulp\n",
        "        if shutdown_event is None:\n",
        "            shutdown_event = threading.Event()\n",
        "        self.shutdown_event = shutdown_event\n",
        "        self.core    = core\n",
        "        \n",
        "    def shutdown(self):\n",
        "        self.shutdown_event.set()\n",
        "          \n",
        "    def main(self):\n",
        "        with self.oring.begin_writing() as oring:\n",
        "            navg = 24\n",
        "            tint = navg / 25e3\n",
        "            tgulp = tint * self.ntime_gulp\n",
        "            nbeam = 1\n",
        "            chan0 = 1234\n",
        "            nchan = 16*184\n",
        "            npol = 4\n",
        "            \n",
        "            ohdr = {'time_tag': int(int(time.time())*196e6),\n",
        "                    'seq0':     0, \n",
        "                    'chan0':    chan0,\n",
        "                    'cfreq0':   chan0*25e3,\n",
        "                    'bw':       nchan*25e3,\n",
        "                    'navg':     navg,\n",
        "                    'nbeam':    nbeam,\n",
        "                    'nchan':    nchan,\n",
        "                    'npol':     npol,\n",
        "                    'pols':     'XX,YY,CR,CI',\n",
        "                    'complex':  False,\n",
        "                    'nbit':     32}\n",
        "            ohdr_str = json.dumps(ohdr)\n",
        "            \n",
        "            ogulp_size = self.ntime_gulp*nbeam*nchan*npol*4      # float32\n",
        "            oshape = (self.ntime_gulp,nbeam,nchan,npol)\n",
        "            self.oring.resize(ogulp_size)\n",
        "            \n",
        "            prev_time = time.time()\n",
        "            with oring.begin_sequence(time_tag=ohdr['time_tag'], header=ohdr_str) as oseq:\n",
        "                while not self.shutdown_event.is_set():\n",
        "                    with oseq.reserve(ogulp_size) as ospan:\n",
        "                        \n",
        "                        odata = ospan.data_view(numpy.float32).reshape(oshape)\n",
        "                        odata[...] = numpy.random.randn(*oshape)\n",
        "                        \n",
        "                        curr_time = time.time()\n",
        "                        while curr_time - prev_time < tgulp:\n",
        "                            time.sleep(0.01)\n",
        "                            curr_time = time.time()"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "518b71ec",
      "metadata": {
        "id": "518b71ec"
      },
      "source": [
        "The `GeneratorOp` block is implemented as an object with a `main` method that is intended to be launched via `threading.Thread`.  This setup is consistent with the Bifrost asychronous model where each block is asynchronous but operations on spans/gulps within a particular block are synchronous.  The `__init__` method sets up the block and defines the ring that is to be used and the CPU core that the `main` method will be bound to when `main` is started.  The `main` method does the heavy lifting here.  Inside this method:\n",
        "\n",
        " 1. The output ring is prepared for writing.\n",
        " 2. The parameters of the generated data (number of time samples, number of channels, etc.) and other meta data are defined and dumped to a JSON object.\n",
        " 3.  The output sequence is started.\n",
        " 4.  The innermost loops starts and puts random data into the output sequence span by span until a shutdown event breaks out of this loop.  The speed of iteration through this inner loop is controlled by a call to `time.sleep` so that the data rate can be limited.\n",
        "\n",
        "The writer block looks like:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 4,
      "id": "0bb566f8",
      "metadata": {
        "id": "0bb566f8"
      },
      "outputs": [],
      "source": [
        "class WriterOp(object):\n",
        "    def __init__(self, iring, ntime_gulp=250, guarantee=True, core=None):\n",
        "        self.iring      = iring\n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.guarantee  = guarantee\n",
        "        self.core       = core\n",
        "        \n",
        "    def main(self):\n",
        "        for iseq in self.iring.read(guarantee=self.guarantee):\n",
        "            ihdr = json.loads(iseq.header.tostring())\n",
        "            \n",
        "            print(\"Writer: Start of new sequence:\", str(ihdr))\n",
        "            \n",
        "            time_tag = ihdr['time_tag']\n",
        "            navg     = ihdr['navg']\n",
        "            nbeam    = ihdr['nbeam']\n",
        "            chan0    = ihdr['chan0']\n",
        "            nchan    = ihdr['nchan']\n",
        "            chan_bw  = ihdr['bw'] / nchan\n",
        "            npol     = ihdr['npol']\n",
        "            pols     = ihdr['pols']\n",
        "            pols     = pols.replace('CR', 'XY_real')\n",
        "            pols     = pols.replace('CI', 'XY_imag')\n",
        "            \n",
        "            igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32\n",
        "            ishape = (self.ntime_gulp,nbeam,nchan,npol)\n",
        "            \n",
        "            prev_time = time.time()\n",
        "            iseq_spans = iseq.read(igulp_size)\n",
        "            for ispan in iseq_spans:\n",
        "                if ispan.size < igulp_size:\n",
        "                    continue # Ignore final gulp\n",
        "                    \n",
        "                idata = ispan.data_view(numpy.float32).reshape(ishape)\n",
        "                with open(f\"{time_tag}.dat\", 'wb') as fh:\n",
        "                    fh.write(idata.tobytes())\n",
        "                    print('  ', fh.name, '@', os.path.getsize(fh.name))\n",
        "                    \n",
        "                time_tag += navg * self.ntime_gulp * (int(196e6) // int(25e3))"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "e614dedf",
      "metadata": {
        "id": "e614dedf"
      },
      "source": [
        "The `WriterOp` block is structured in the same was as `GeneratorOp` block but uses the \"input ring\" structure for accessing data.\n",
        "\n",
        "To run these two blocks as a pipeline you would using the following inside a `if __name__ == '__main__':` block:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 5,
      "id": "7dfa4ef3",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "7dfa4ef3",
        "outputId": "b172367c-8c1a-4b99-d58b-f915a814fd2b"
      },
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:10: DeprecationWarning: tostring() is deprecated. Use tobytes() instead.\n",
            "  # Remove the CWD from sys.path while we load stuff.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Writer: Start of new sequence: {'time_tag': 324645856752000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', 'complex': False, 'nbit': 32}\n",
            "   324645856752000000.dat @ 11776000\n",
            "   324645856799040000.dat @ 11776000\n",
            "   324645856846080000.dat @ 11776000\n",
            "   324645856893120000.dat @ 11776000\n",
            "   324645856940160000.dat @ 11776000\n",
            "   324645856987200000.dat @ 11776000\n",
            "   324645857034240000.dat @ 11776000\n",
            "   324645857081280000.dat @ 11776000\n",
            "   324645857128320000.dat @ 11776000\n",
            "   324645857175360000.dat @ 11776000\n",
            "   324645857222400000.dat @ 11776000\n",
            "   324645857269440000.dat @ 11776000\n",
            "   324645857316480000.dat @ 11776000\n",
            "   324645857363520000.dat @ 11776000\n",
            "   324645857410560000.dat @ 11776000\n",
            "   324645857457600000.dat @ 11776000\n",
            "   324645857504640000.dat @ 11776000\n",
            "   324645857551680000.dat @ 11776000\n",
            "   324645857598720000.dat @ 11776000\n",
            "Done\n"
          ]
        }
      ],
      "source": [
        "write_ring = bifrost.ring.Ring(name=\"write\")\n",
        "\n",
        "ops = []\n",
        "ops.append(GeneratorOp(write_ring, ntime_gulp=250, core=0))\n",
        "ops.append(WriterOp(write_ring, ntime_gulp=250, core=1))\n",
        "\n",
        "threads = [threading.Thread(target=op.main) for op in ops]\n",
        "for thread in threads:\n",
        "    thread.start()\n",
        "    \n",
        "# Don't run forever\n",
        "time.sleep(3)\n",
        "ops[0].shutdown()\n",
        "\n",
        "for thread in threads:\n",
        "    thread.join()\n",
        "print(\"Done\")"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "abc15540",
      "metadata": {
        "id": "abc15540"
      },
      "source": [
        "This creates the ring that connects the two blocks, creates the `threading.Thread` instances that run each block, and starts the blocks.\n",
        "\n",
        "If you wanted to insert a third block between `GeneratorOp` and `WriterOp` it might look like:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 6,
      "id": "6519c569",
      "metadata": {
        "id": "6519c569"
      },
      "outputs": [],
      "source": [
        "class CopyOp(object):\n",
        "    def __init__(self, iring, oring, ntime_gulp=250, guarantee=True, core=-1):\n",
        "        self.iring = iring\n",
        "        self.oring = oring\n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.guarantee = guarantee\n",
        "        self.core = core\n",
        "        \n",
        "    def main(self):\n",
        "        with self.oring.begin_writing() as oring:\n",
        "            for iseq in self.iring.read(guarantee=self.guarantee):\n",
        "                ihdr = json.loads(iseq.header.tostring())\n",
        "                \n",
        "                print(\"Copy: Start of new sequence:\", str(ihdr))\n",
        "                \n",
        "                time_tag = ihdr['time_tag']\n",
        "                navg     = ihdr['navg']\n",
        "                nbeam    = ihdr['nbeam']\n",
        "                chan0    = ihdr['chan0']\n",
        "                nchan    = ihdr['nchan']\n",
        "                chan_bw  = ihdr['bw'] / nchan\n",
        "                npol     = ihdr['npol']\n",
        "                pols     = ihdr['pols']\n",
        "                pols     = pols.replace('CR', 'XY_real')\n",
        "                pols     = pols.replace('CI', 'XY_imag')\n",
        "\n",
        "                igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32\n",
        "                ishape = (self.ntime_gulp,nbeam,nchan,npol)\n",
        "                self.iring.resize(igulp_size, igulp_size*5)\n",
        "                \n",
        "                ogulp_size = igulp_size\n",
        "                oshape = ishape\n",
        "                self.oring.resize(ogulp_size)\n",
        "                \n",
        "                ohdr = ihdr.copy()\n",
        "                ohdr_str = json.dumps(ohdr)\n",
        "                \n",
        "                iseq_spans = iseq.read(igulp_size)\n",
        "                with oring.begin_sequence(time_tag=time_tag, header=ohdr_str) as oseq:\n",
        "                    for ispan in iseq_spans:\n",
        "                        if ispan.size < igulp_size:\n",
        "                            continue # Ignore final gulp\n",
        "                            \n",
        "                        with oseq.reserve(ogulp_size) as ospan:\n",
        "                            idata = ispan.data_view(numpy.float32)\n",
        "                            odata = ospan.data_view(numpy.float32)    \n",
        "                            odata[...] = idata"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "8fc149b7",
      "metadata": {
        "id": "8fc149b7"
      },
      "source": [
        "This `CopyOp` block combines the characteristics of reading and writing from rings to copy data from one ring to the other."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 7,
      "id": "094130bc",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "094130bc",
        "outputId": "1f631a9f-5ba4-417e-cfc7-5bf644193787"
      },
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:12: DeprecationWarning: tostring() is deprecated. Use tobytes() instead.\n",
            "  if sys.path[0] == '':\n",
            "/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:10: DeprecationWarning: tostring() is deprecated. Use tobytes() instead.\n",
            "  # Remove the CWD from sys.path while we load stuff.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Copy: Start of new sequence: {'time_tag': 324645857340000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', 'complex': False, 'nbit': 32}\n",
            "Writer: Start of new sequence: {'time_tag': 324645857340000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', 'complex': False, 'nbit': 32}\n",
            "   324645857340000000.dat @ 11776000\n",
            "   324645857387040000.dat @ 11776000\n",
            "   324645857434080000.dat @ 11776000\n",
            "   324645857481120000.dat @ 11776000\n",
            "   324645857528160000.dat @ 11776000\n",
            "   324645857575200000.dat @ 11776000\n",
            "   324645857622240000.dat @ 11776000\n",
            "   324645857669280000.dat @ 11776000\n",
            "   324645857716320000.dat @ 11776000\n",
            "   324645857763360000.dat @ 11776000\n",
            "   324645857810400000.dat @ 11776000\n",
            "   324645857857440000.dat @ 11776000\n",
            "   324645857904480000.dat @ 11776000\n",
            "   324645857951520000.dat @ 11776000\n",
            "   324645857998560000.dat @ 11776000\n",
            "   324645858045600000.dat @ 11776000\n",
            "   324645858092640000.dat @ 11776000\n",
            "   324645858139680000.dat @ 11776000\n",
            "Done\n"
          ]
        }
      ],
      "source": [
        "copy_ring  = bifrost.ring.Ring(name=\"copy\")\n",
        "write_ring = bifrost.ring.Ring(name=\"write\")\n",
        "\n",
        "ops = []\n",
        "ops.append(GeneratorOp(copy_ring, ntime_gulp=250, core=0))\n",
        "ops.append(CopyOp(copy_ring, write_ring, ntime_gulp=250, core=1))\n",
        "ops.append(WriterOp(write_ring, ntime_gulp=250, core=2))\n",
        "\n",
        "threads = [threading.Thread(target=op.main) for op in ops]\n",
        "for thread in threads:\n",
        "    thread.start()\n",
        "    \n",
        "# Don't run forever\n",
        "time.sleep(3)\n",
        "ops[0].shutdown()\n",
        "\n",
        "for thread in threads:\n",
        "    thread.join()\n",
        "print(\"Done\")"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "916ef6f2",
      "metadata": {
        "id": "916ef6f2"
      },
      "source": [
        "These examples should provide a starting point for building pipelines in Bifrost.  Although the examples run purely on the CPU, GPU versions can also be create, either through copying to GPU memory within each block or by using rings in the `cuda` memory space.  If `cuda` rings the user needs to ensure that all memory copies to/from the ring are executed before the span is released.  Otherwise, corruption of the ring's contents can happen.  In the next section we will look at logging from inside the blocks to help understand performance."
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.9"
    },
    "colab": {
      "name": "04_pipelines.ipynb",
      "provenance": []
    },
    "gpuClass": "standard"
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
